<?php

Class AsyncTcpServer
{

    protected $protocol;
    protected $host;
    protected $port;
    protected $errorNo;
    protected $errorMsg;

    protected $serverSocket;

    /**
     * @var EventLoop   $eventLoop
     */
    protected $eventLoop;

    protected $requestConnections;
    protected $clientConnections;

    protected $registerActions = array();

    /**
     * AwaitTcpServer constructor.
     *
     * @param string    $host
     * @param integer   $port
     */
    public function __construct($host, $port, $eventloop = null)
    {
        $this->host = $host;
        $this->port = $port;
        $this->protocol = 'tcp';
        $this->serverSocket = null;
        $this->eventLoop = $eventloop;
        $this->connections = array();
        $this->requestConnections = array();
        $this->clientConnections = array();

        $this->registerActions = array(
            'accept' => function () {},
            'receive' => function () {},
            'close' => function () {},
        );
    }

    /**
     * @return array
     */
    public function getConnections()
    {
        return array_merge(array($this->serverSocket), $this->requestConnections, $this->clientConnections);
    }

    /**
     * @param String  $action
     * @param Closure $closure
     */
    public function on($action, Closure $closure)
    {
        $this->registerActions[$action] = $closure;
    }

    /**
     * @param string    $str
     * @param integer   $len
     */
    public function console($str, $len = null)
    {
        fwrite(STDOUT, $str);
    }

    private function onClose($socket, $info)
    {
        $this->registerActions['close'] ($this, $socket, $info);

    }

    /**
     * 初始化server
     *
     * @return false|resource|null
     */
    public function init()
    {
        $this->serverSocket = @stream_socket_server($this->protocol . '://' . $this->host . ':' . $this->port, $this->errorNo, $this->errorMsg);
        if ($this->serverSocket === false) {
            throw new \RuntimeException("fail to listen on port: {$this->port}!");
        }
        $this->console("socket server is listening : {$this->host}:{$this->port}" . PHP_EOL);
        stream_set_blocking($this->serverSocket, false);
        return $this->serverSocket;
    }

    /**
     * 等待handler 新的接入
     *
     * @param int $timeout
     * @return array
     */
    public function accept($timeout = 0)
    {
        $pear = '';
        $timeout = empty($this->requestConnections) ? -1 : 0;

        if ($c = @stream_socket_accept($this->serverSocket, $timeout, $peer)) {
            stream_set_blocking($c, false);

            $this->requestConnections[$peer] = $c;
        }
        return array($c, $peer);
    }

    /**
     * @param resource  $requestSocket
     * @param string    $peerName
     *
     * @return mixed
     */
    public function acceptCallBack($requestSocket, $peerName)
    {
        // user callback
        return $this->registerActions['accept'] ($this, $requestSocket, array('pearName' => $peerName));
    }

    /**
     * 客户端请求内容
     *
     * @return array
     */
    public function receive()
    {
        foreach ($this->requestConnections as $name => $requestConnection) {
            $peer = stream_socket_get_name($requestConnection, true);
            $contents = $part = '';

            do {
                $part = fread($requestConnection, 4096);
                $contents .= $part;
            } while ($part);
            // user callback
            if ($contents) {
                return array($requestConnection, $contents, $peer);
            }
        }

        return array(null, null, null);
    }

    /**
     * @param $requestSocket
     * @param $contents
     * @param $peer
     *
     * @return mixed
     */
    public function receiveCallback($requestSocket, $contents, $peer)
    {
        return $this->registerActions['receive'] ($this, $requestSocket, array('pearName' => $peer), $contents);
    }

    /**
     * @param $socket
     * @return bool
     */
    public function close($socket)
    {
        foreach ($this->requestConnections as $requestConnection) {
            $peer = stream_socket_get_name($requestConnection, true);
            if (isset($this->requestConnections[$peer])) {
                unset($this->requestConnections[$peer]);
            }
        }

        // client response
        foreach ($this->clientConnections as $clientConnection) {
            $peer = stream_socket_get_name($clientConnection, true);
            if (isset($this->clientConnections[$peer])) {
                unset($this->clientConnections[$peer]);
            }
        }
        return fclose($socket);
    }

}

/**
 * Class SyncTcpClient
 */
class AsyncTcpClient
{
    protected $protocol;
    protected $host;
    protected $port;

    /**
     * @var EventLoop   $eventLoop
     */
    protected $eventLoop;

    /**
     * @var resource    $clientConnection
     */
    protected $clientConnection;

    /**
     * SyncTcpClient constructor.
     */
    public function __construct($eventLoop = null)
    {
        $this->protocol = 'tcp';
        $this->eventLoop = $eventLoop;
    }

    /**
     * @param      $host
     * @param      $port
     * @param null $timeout
     * @return false|resource
     * @throws \RuntimeException
     */
    public function connection($host, $port, $timeout = null)
    {
        $this->host = $host;
        $this->port = $port;

        if (null === $timeout) {
            $timeout = ini_get("default_socket_timeout");
        }
        $this->timeout = $timeout;

        $this->clientConnection = stream_socket_client("{$this->protocol}://{$this->host}:{$this->port}", $errorNo, $errorMsg, $timeout, STREAM_CLIENT_CONNECT);
        if ($this->clientConnection === false) {
            throw new \RuntimeException("unable to create socket: " . $errorMsg . ', code: ' . $errorNo);
        }
//        EventLoop::inst()->addClientStream($this->clientConnection);
        return $this->clientConnection;
    }

    /**
     * @return array
     */
    public function getConnections()
    {
        return array($this->clientConnection);
    }

    public function send($data)
    {
        return fwrite($this->clientConnection, $data);
    }

    public function response($length = 1024)
    {
        $contents = '';
        $streamName = strval($this->clientConnection);
        return new Promise($streamName, $this, array($this, 'read'), array($length));
    }

    public function read($length = 1024)
    {
        $part = $contents = $peer = '';
        while (!feof($this->clientConnection)) {
            $part = fread($this->clientConnection, $length);
            $contents .= $part;
            $peer = stream_socket_get_name($this->clientConnection, true);
        }
        return array($this->clientConnection, $contents, $peer);
    }

    public function close()
    {
        if ($this->clientConnection) {
            fclose($this->clientConnection);
        }
    }

}

/**
 *
 * Class EventLoop
 */
class EventLoop
{
    private static $inst;
    private static $currentStack;

    protected $allStreams;

    protected $clientStreams;
    protected $requestStreams;
    protected $serverStreams;
    protected $promises;
    protected $streamNames;

    protected $mainProcess;


    protected $serverHandlers;
    protected $clientHandlers;

    /**
     * EventLoop constructor.
     */
    private function __construct()
    {
        $this->streams = array();

        $this->clientStreams = array();
        $this->requestStreams = array();
        $this->serverStreams = array();

        $this->serverHandlers = array();
        $this->clientHandlers = array();
        $this->promises = array();
    }

    /**
     * @return EventLoop
     */
    public static function inst()
    {
        if (empty(self::$inst)) {
            self::$inst = new self();
        }
        return self::$inst;
    }

    public static function __callStatic($name, $arguments)
    {
        return self::inst()->{$name}($arguments);
    }

    /**
     * @param $num
     * @param $stack
     * @return string
     */
    public static function setCurrentStack($num, $stack)
    {
        self::$currentStack = $stack . '_' . $num ;
        return self::$currentStack;
    }

    public function listen($name, Promise $promise)
    {
//        $this->promises[$name] = array('promise' => $promise, 'runtime' => self::$currentStack);
        $promise->setCurrentStack(self::$currentStack);
        $this->promises[$name] = $promise;
        $this->clientStreams[$name] = $promise->getstream();
        $this->clientHandlers[$name] = $promise->getHandler();
    }

    public function addServerStreams($stream)
    {
        $this->addStream($stream, 'server');
        return $this;
    }

    public function removeServerStream($stream)
    {
        $this->removeStream($stream, 'server');
        return $this;
    }

    public function addClientStream($stream)
    {
        $this->addStream($stream, 'client');
        return $this;
    }

    public function removeClientStream($stream)
    {
        $this->removeStream($stream, 'client');
        return $this;
    }

    public function addRequestStream($stream, $peerName = '')
    {
        $this->addStream($stream, 'request', $peerName);
        return $this;
    }

    /**
     * @param resource  $stream
     */
    public function addStream($stream, $type, $peerName = '')
    {
        $streamName = strval($stream);
        $this->streams[$type . '_' . $streamName] = $stream;

        $this->streamNames[$streamName] = $streamName;
        $this->promises[$streamName] = new Promise($stream, $this, function() {}, array());

        if ('client' == $type) {
            $this->clientStreams[$streamName] = $stream;
        } elseif ('request' == $type) {
            $this->requestStreams[$streamName] = $stream;
        } elseif ('server' == $type) {
            $this->serverStreams[$streamName] = $streamName;
        }
    }

    /**
     * @param $stream
     * @param $type
     */
    public function removeStream($stream, $type)
    {
        $streamName = strval($stream);
        $this->streams[$type . '_' . $streamName] = $stream;

        unset($this->streamNames[$streamName]);
        unset($this->promises[$streamName]);

        if ('client' == $type) {
            unset($this->clientStreams[$streamName]);
        } elseif ('request' == $type) {
            unset($this->requestStreams[$streamName]);
        } elseif ('server' == $type) {
            unset($this->serverStreams[$streamName]);
        }
    }

    /**
     * @return mixed
     */
    public function getServerHandlers()
    {
        return $this->serverHandlers;
    }

    /**
     * @param $serverHandlers
     * @return $this
     */
    public function addServerHandlers($serverHandlers)
    {
        $this->serverHandlers[] = $serverHandlers;
        return $this;
    }

    /**
     * @return mixed
     */
    public function getClientHandlers()
    {
        return $this->clientHandlers;
    }

    /**
     * @param $clientHandlers
     * @return $this
     */
    public function addClientHandlers($clientHandlers)
    {
        $this->clientHandlers[] = $clientHandlers;
        return $this;
    }

    public function getPromises()
    {
        return $this->promises;
    }

    /**
     * @param $stream
     * @return |null
     */
    public function getPromise($stream)
    {
        $streamName = strval($stream);
        if (isset($this->promises[$streamName])) {
            return $this->promises[$streamName];
        } else {
            return null;
        }
    }

    /**
     * @param Promise $promise
     * @param         $data
     * @return bool
     * @throws Exception
     */
    public function injection(Promise $promise, $data)
    {
        $stack = $promise->getCurrentStack();
        if (isset($this->mainProcess[$stack])) {
            /**
             * @var Generator   $gen
             */
            $gen = $this->mainProcess[$stack];
            return $gen->send($data);
        } else {
            throw new \Exception('未知 Stack ' . $stack);
        }
        return false;
    }

    private function select(&$read, &$write, &$except, $timeout)
    {
        return stream_select($read, $write, $except, $timeout);
    }

    /**
     * 初始化所有 handler
     *
     * @return $this
     */
    public function init()
    {
        /**
         * @var AsyncTcpServer  $handler
         */
        $handler = null;
        foreach ($this->serverHandlers as $handler) {
            $handler->init();
        }
        return $this;
    }

    /**
     * 开始事件循环
     *
     * @throws Exception
     */
    public function loop()
    {
        $keepRun = true;
        while ($keepRun) {
            $allStreams = array();
            /**
             * @var AsyncTcpServer  $handler
             */
            $handler = null;
            foreach ($this->serverHandlers as $handler) {
                $allStreams = array_merge($allStreams, $handler->getConnections());
            }

            /**
             * @var AsyncTcpClient  $clientHandler
             */
            $clientHandler = null;
            foreach ($this->clientHandlers as $clientHandler) {
                $allStreams = array_merge($allStreams, $clientHandler->getConnections());
            }
//            $allStreams = array_merge($this->serverStreams, $this->requestStreams, $this->clientStreams);
            if ($this->select($allStreams, $write, $except, 6)) {
                $ifContinue = false;

                /**
                 * @var AsyncTcpServer  $serverHandler
                 */
                $serverHandler = null;
                foreach ($this->serverHandlers as $key => $serverHandler) {
                    list($stream, $peer) = $serverHandler->accept(empty($allStreams) ? -1 : 0);
                    if (empty($peer)) {
                        // 没有新请求
                        continue;
                    }
                    $streamName = strval($stream);
                    $processName = self::setCurrentStack($streamName, 'accept');
                    $gen = $serverHandler->acceptCallBack($stream, $peer);
                    if ($gen instanceof Generator) {
                        $gen->current();
                        $this->mainProcess[$processName] = $gen;
                    }
                }

                foreach ($this->serverHandlers as $key => $serverHandler) {
                    list($stream,  $data, $peer) = $serverHandler->receive();
                    if (empty($peer))
                        continue;
                    $streamName = strval($stream);
                    $processName = self::setCurrentStack($streamName, 'receive');
                    $gen = $serverHandler->receiveCallback($stream, $data, $peer);
                    if ($gen instanceof Generator) {
                        $gen->current();
                        $this->mainProcess[$processName] = $gen;
                    }
                    if ($data) {
                        $ifContinue = false;
                        break;
                    }
                }

                if ($ifContinue)
                    continue;

                /**
                 * @var Promise $promise
                 */
                $promise = null;
                foreach ($this->promises as $promise) {
                    $stream = $promise->getStream();
                    /**
                     * @var AsyncTcpClient  $handler
                     */
                    $handler = $promise->getHandler();
                    list($stream, $data, $peer) = $handler->read();
                    if ($data) {
//                        Async::inject($stream, $promise, $data);
                        $stack = $promise->getCurrentStack();
                        $gen = $this->mainProcess[$stack];
                        if ($gen instanceof Generator) {
                            $promise->resolve($data);
                            $dataWrapper = array('data' => $data, 'streamName' => strval($stream));
                            $gen->send($dataWrapper);
                        }
                        break;
                    }
                }
            }  // end event loop
        }

    }
}

class Promise
{
    protected $streamName;
    protected $stream;
    protected $func;
    protected $args;
    protected $key;
    protected $data;
    protected $currentStack;
    protected $handler;

    public function __construct($key, $handler, callable $func, ...$args)
    {
//        $streamName = strval($stream);
//        $this->streamName = $streamName;
        $this->func = $func;
        $this->args = $args;
        $this->key = $key;
        $this->streamName = $key;
        $this->handler = $handler;
    }

    public function resolve($data)
    {
        $this->data = $data;
    }

    public function getKey()
    {
        return $this->key;
    }

    public function getData()
    {
        return $this->data;
    }

    public function getStream()
    {
        return $this->stream;
    }

    public function getStreamName()
    {
        return $this->streamName;
    }

    public function getHandler()
    {
        return $this->handler;
    }

    /**
     * @return mixed
     */
    public function getCurrentStack()
    {
        return $this->currentStack;
    }

    /**
     * @param mixed $currentStack
     */
    public function setCurrentStack($currentStack): void
    {
        $this->currentStack = $currentStack;
    }

}

class Async
{
    protected static $streams;
    protected static $generators;
    protected static $params;

    /**
     * @param callable $func
     * @param mixed    ...$args
     *
     * @return mixed|Promise
     */
    public static function call(callable $func, ...$args)
    {
        self::$generators[] = $func;
        $key =key(self::$generators);
        self::$params[$key] = $args;
//        return new Promise($key, $func, $args);
        $promise = call_user_func($func, ...$args);
        EventLoop::inst()->listen($promise->getStreamName(), $promise);
        return $promise;
    }


    public static function all(...$promises)
    {

        $re = array();
        foreach ($promises as $promise)
        {
            $re[] = yield;
        }

        /**
         * @var Promise $promise
         */
        $promise = null;
        foreach ($promises as $promise) {
            foreach ($re as $param) {
                if ($param['streamName'] == $promise->getStreamName()) {
                    $results[] = $param['data'];
                }
            }
        }
        return $results;
    }

    public static function realCall(Promise $promise)
    {
        $key = $promise->getKey();
        if (isset(self::$generators[$key])) {
            $args = self::$params[$key];
            $handle = (self::$generators[$key]) ($args);
        }

    }

    public static function await(Promise $promise)
    {
        EventLoop::inst()->listen($promise->getStreamName(), $promise);
        self::realCall($promise);

        $re = yield;

        $promise->resolve($re['data']);

        return $promise->getData();
    }

    public static function extract($params, ...$promises)
    {
        $result = array();
        /**
         * @var Promise $promise
         */
        $promise = null;
        foreach ($promises as $promise) {
            foreach ($params as $param) {
                if ($param['streamName'] == $promise->getStreamName()) {
                    $results[] = $param['data'];
                }
            }
        }
        return $results;
    }
}

